Skip to content

feat: add lifecycle-bound SSE stream and typed per-endpoint adapter#153

Merged
OmarAlJarrah merged 3 commits into
mainfrom
feat/sse-stream-lifecycle
Jun 28, 2026
Merged

feat: add lifecycle-bound SSE stream and typed per-endpoint adapter#153
OmarAlJarrah merged 3 commits into
mainfrom
feat/sse-stream-lifecycle

Conversation

@OmarAlJarrah

@OmarAlJarrah OmarAlJarrah commented Jun 17, 2026

Copy link
Copy Markdown
Member

Summary

Adds a lifecycle-bound Server-Sent Events stream and a reusable typed adapter, so streaming endpoints can be consumed safely and decoded into models without per-API conventions leaking into core.

SseStream — AutoCloseable Iterable bound to the response (#35)

The SSE surface was a bare Sequence whose reader explicitly disclaimed ownership of the response, so a partial consume could strand the connection. SseStream now wraps the WHATWG parser and owns the response:

  • AutoCloseable, Iterable<ServerSentEvent>. Closing it — explicitly, via use {} / try-with-resources, or implicitly when iteration runs to completion — closes the underlying response/body and releases its pooled connection. This mirrors the close-on-partial-consume invariant PagedIterable enforces.
  • The previously doc-only "do not iterate twice" warning is now an enforced single-pass guard; iteration after close() is rejected; close() is idempotent and safe to call concurrently (e.g. cancellation from another thread).
  • Reader exceptions (mid-stream drops) propagate to the caller, but the response is released first, so even a consumer iterating without use {} never strands the connection; any failure to release is attached to the reader error as a suppressed throwable.
  • Response.sseStream() extension opens a stream bound to the response body lifecycle.

TypedSseStream<T> + SseEventMapper<T> — per-endpoint typed adapter (#62)

A reusable runtime adapter turning SseStream into AutoCloseable Iterable<T> by applying a caller-supplied (eventName, data) -> Result<T> mapper:

  • The mapper returns a decoded value, Skip (keep-alives, bare cursors), or a Done sentinel that ends the stream and closes it. It is the seam where the Serde SPI is invoked and where per-API done-sentinel / error-envelope conventions live — core holds none of them.
  • Lazy per-element decode: the mapper (and any Serde deserialize inside it) runs only when the consumer pulls the next element, so a partial consume decodes only the events taken.
  • Closing the adapter propagates to the underlying stream and response.
  • SseStream.typed(mapper) extension for ergonomic wrapping.

This is a hand-written runtime primitive — no code generation — fully usable today; a generator can target it later without embedding any per-API convention in core.

Tests

SseStreamTest and TypedSseStreamTest cover: full-iteration auto-close, explicit close, use {} close on partial consume, idempotent close, single-pass and is-closed guards, out-of-band close mid-iteration, reader-exception propagation, Response.sseStream() lifecycle binding and the no-body error, typed mapping via a recording Deserializer, lazy per-element decode, Skip/Done handling, mapper-exception propagation, and close propagation.

Gated build (scoped, --no-daemon)

./gradlew :sdk-core:test :sdk-core:ktlintCheck :sdk-core:detekt :sdk-core:apiCheck --no-daemon

Result: BUILD SUCCESSFUL. :sdk-core:apiDump was run and the regenerated sdk-core/api/sdk-core.api is committed.

Closes #35
Closes #62

Introduce SseStream: an AutoCloseable Iterable<ServerSentEvent> that owns
the underlying HTTP response. Closing the stream — explicitly, via use {} /
try-with-resources, or implicitly when iteration runs to completion — closes
the response and releases its connection, so a partial consume never strands
the body. This mirrors the close-on-partial-consume invariant PagedIterable
enforces. The previously doc-only "do not iterate twice" warning is now an
enforced single-pass guard, and iteration after close is rejected.

Add a reusable per-endpoint adapter, TypedSseStream<T>, that maps raw events
to typed models via a caller-supplied SseEventMapper. The mapper receives the
event name and joined data and returns a decoded value, Skip, or a Done
sentinel; it is the seam where the Serde SPI is invoked and where per-API
done-sentinel and error-envelope conventions live. Mapping is applied lazily,
one element at a time, so a partial consume decodes only the events taken.
Closing the typed adapter propagates to the underlying stream.

Both surfaces are hand-written runtime primitives usable today; a code
generator can target them later without embedding any per-API convention in
core.

Closes #35
Closes #62
SseStream and TypedSseStream only released the underlying response when
iteration reached a clean end-of-stream. A mid-stream reader failure (a
dropped connection) or a throwing event mapper propagated the error without
closing, so any consumer that iterated without `use {}` — a bare for-loop or
`toList()` — stranded the response body and its pooled connection. The
success path auto-closed, which made bare iteration look safe right up until
an error hit.

Release the response on those error paths too. Close is idempotent, so a
surrounding `use {}` still works; a failure to release is attached to the
original error as a suppressed throwable instead of masking it.

Make automatic end-of-stream cleanup tolerant of a close failure as well: the
events were already delivered, so a failing resource close on the final pull
(clean EOS, or a TypedSseStream done-sentinel) must not turn a fully-read
stream into a thrown result and discard the collected events. An explicit
close() still propagates a release failure — that is the caller asking to
release, so they own it.

Drop the redundant ReentrantLock around the close: the AtomicBoolean
compare-and-set already guarantees the resource is closed exactly once, so
only the CAS winner ever entered the lock. Clarify the threading docs —
cancelling a stream blocked inside an in-flight read surfaces as an
IOException to the iterating thread, not a clean end.
Automatic end-of-stream cleanup swallows a failure to release the response,
because the events have already been delivered and letting the failure
propagate would discard a fully-read stream. Swallowing it silently hides a
real I/O problem, so emit it through ClientLogger at WARN (event
`sse.close.failed`) with the cause attached. An explicit close() still
propagates the failure, and the suppressed-onto-the-primary path for an
in-flight reader/mapper error is unchanged.

Centralize the quiet release in SseStream.releaseQuietly so the TypedSseStream
done-sentinel and mapper-error paths share that one site instead of
duplicating the close handling.

The regenerated API snapshot reflects the logger now threaded through
SseStream's private constructor; the public factory surface is unchanged.
@OmarAlJarrah OmarAlJarrah merged commit 4bdaead into main Jun 28, 2026
1 check passed
@OmarAlJarrah OmarAlJarrah deleted the feat/sse-stream-lifecycle branch June 28, 2026 11:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Generate a per-endpoint SSE adapter mapping events to typed models Make the SSE stream an AutoCloseable Iterable bound to response close

1 participant